手写Netty实现RPC接口远程调用客户端与服务器

您所在的位置:网站首页 netty 服务端相互调用 手写Netty实现RPC接口远程调用客户端与服务器

手写Netty实现RPC接口远程调用客户端与服务器

2024-07-07 21:24| 来源: 网络整理| 查看: 265

Java的IO分为BIO、NIO、AIO(NIO.2), 其中它们分别含义是:

Java BIO : 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。

Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。

其中NIO常用框架netty,dubbo作为远程调用RPC也是调用此框架。下面我就以netty写一个远程调用例子。

项目应用结构:

      

整个项目分为客户端(wyjm-netty-client)、服务器端(wyjm-netty-server)和公共模块(wyjm-netty-common)。

客户端(wyjm-netty-client):

    wyjm-netty-client-bean,客户端消费参数的处理

    

    ConsumerConfigure.java,消费者实体数据信息。此参数是在consumer.xml配置。

package com.jd.wyjm.netty.client.bean.model; import lombok.Data; @Data public class ConsumerConfigure { /** * IP地址 */ private String ip; /** * 端口 */ private Integer port; /** * 接口类别 */ private String interfaceClass; /** * 重试次数 */ private Integer retries; } ConsumerBeanDefinitionParser.java,解析消费者实体数据信息,数据来源在consumer.xml配置。 package com.jd.wyjm.netty.client.bean.schema; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionRegistry; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.BeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; import org.w3c.dom.Element; public class ConsumerBeanDefinitionParser implements BeanDefinitionParser { private final Class beanClass; public ConsumerBeanDefinitionParser(Class beanClass) { this.beanClass = beanClass; } @Override public BeanDefinition parse(Element element, ParserContext parserContext) { RootBeanDefinition beanDefinition = new RootBeanDefinition(); beanDefinition.setBeanClass(beanClass); beanDefinition.setLazyInit(false); beanDefinition.getPropertyValues().add("ip", element.getAttribute("ip")); beanDefinition.getPropertyValues().add("port", element.getAttribute("port")); beanDefinition.getPropertyValues().add("interfaceClass", element.getAttribute("interfaceClass")); beanDefinition.getPropertyValues().add("retries", element.getAttribute("retries")); BeanDefinitionRegistry beanDefinitionRegistry = parserContext.getRegistry(); beanDefinitionRegistry.registerBeanDefinition(element.getAttribute("id"),beanDefinition);   //注册bean到BeanDefinitionRegistry中 return beanDefinition; } } ConsumerNamespaceHandler.java,注册Bean到Spring容器。 package com.jd.wyjm.netty.client.bean.schema; import com.jd.wyjm.netty.client.bean.model.ConsumerConfigure; import org.springframework.beans.factory.xml.NamespaceHandlerSupport; public class ConsumerNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("consumer", new ConsumerBeanDefinitionParser(ConsumerConfigure.class)); } }

netty-client.xsd,定义xml配置文件节点。

spring.handlers,定义ConsumerNamespaceHandler的加载。 

http\://wyjm.netty.com/schema=com.jd.wyjm.netty.client.bean.schema.ConsumerNamespaceHandler

spring.schemas,定义consumer.xml

http\://wyjm.netty.com/schema/netty-client.xsd=META-INF/netty-client.xsd

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-client-bean com.jd.wyjm.netty wyjm-netty-common 1.0-SNAPSHOT org.springframework spring-beans 3.1.1.RELEASE

    wyjm-netty-client-core,客户端消费核心

    

   ClientChannelHandlerAdapter.java,客户端渠道句柄适配器  

package com.jd.wyjm.netty.client.core.adapter; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; @Slf4j public class ClientChannelHandlerAdapter extends ChannelHandlerAdapter { /** * 方法元信息 */ private MethodInvokeMeta methodInvokeMeta; /** * 客户端渠道 */ private ClientChannelInitializer customChannel; public ClientChannelHandlerAdapter(MethodInvokeMeta methodInvokeMeta, ClientChannelInitializer customChannel){ this.methodInvokeMeta=methodInvokeMeta; this.customChannel=customChannel; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("客户端出异常了,异常信息:{}", cause.getMessage()); cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { if (methodInvokeMeta.getMethodName().endsWith("toString") && !"class java.lang.String".equals(methodInvokeMeta.getReturnType().toString())) log.info("客户端发送信息参数:{},信息返回值类型:{}", methodInvokeMeta.getArgs(), methodInvokeMeta.getReturnType()); ctx.writeAndFlush(methodInvokeMeta); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { customChannel.setResponse(msg); } } ClientChannelInitializer.java,客户端渠道管道初始化. package com.jd.wyjm.netty.client.core.adapter; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import com.jd.wyjm.netty.common.entity.NullWritable; import com.jd.wyjm.netty.common.entity.ObjectCodec; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; public class ClientChannelInitializer extends ChannelInitializer { private MethodInvokeMeta methodInvokeMeta; private Object response; public ClientChannelInitializer(MethodInvokeMeta methodInvokeMeta){ this.methodInvokeMeta=methodInvokeMeta; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LengthFieldPrepender(2)); pipeline.addLast(new LengthFieldBasedFrameDecoder(1024 * 1024, 0, 2, 0, 2)); pipeline.addLast(new ObjectCodec()); pipeline.addLast(new ClientChannelHandlerAdapter(methodInvokeMeta, this)); } public Object getResponse() { if (response instanceof NullWritable) { return null; } return response; } public void setResponse(Object response) { this.response = response; } } NettyProxyFactoryBean.java,服务接口动态代理 package com.jd.wyjm.netty.client.core.proxy; import com.jd.wyjm.netty.client.core.NettyClient; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import com.jd.wyjm.netty.common.utils.WrapMethodUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.config.AbstractFactoryBean; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @Slf4j public class NettyProxyFactoryBean extends AbstractFactoryBean implements InvocationHandler { /** * IP地址 */ private String ip; /** * 端口号 */ private int port; /** * 重试次数 */ private int retries; /** * 接口类 */ private Class interfaceClass; /** * 远程客户端 */ private NettyClient nettyClient; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final MethodInvokeMeta methodInvokeMeta = WrapMethodUtils.readMethod(interfaceClass, method, args); if (!methodInvokeMeta.getMethodName().equals("toString")) { log.info("[invoke] 调用接口{},调用方法名:{},入参:{},参数类型:{},返回值类型{}", methodInvokeMeta.getInterfaceClass(), methodInvokeMeta.getMethodName() , methodInvokeMeta.getArgs(), methodInvokeMeta.getParameterTypes(), methodInvokeMeta.getReturnType()); } return nettyClient.remoteCall(methodInvokeMeta, ip,0,port,retries); } @Override public Class getObjectType() { return this.interfaceClass; } @Override protected Object createInstance() throws Exception { log.info("[代理工厂] 初始化代理Bean : {}", this.interfaceClass); return Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, this); } public void setInterfaceClass(Class interfaceClass) { this.interfaceClass = interfaceClass; } public void setNettyClient(NettyClient nettyClient) { this.nettyClient = nettyClient; } public void setIp(String ip) { this.ip = ip; } public void setPort(int port) { this.port = port; } public void setRetries(int retries) { this.retries = retries; } } NettyProxyFactoryRegisterProcessor.java,将对应的接口注册为动态代理。 package com.jd.wyjm.netty.client.core.proxy; import com.jd.wyjm.netty.client.bean.model.ConsumerConfigure; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.stereotype.Component; @Component public class NettyProxyFactoryRegisterProcessor implements BeanFactoryPostProcessor { private DefaultListableBeanFactory beanFactory; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException { this.beanFactory = (DefaultListableBeanFactory) configurableListableBeanFactory; String[] beanNames = configurableListableBeanFactory.getBeanNamesForType(ConsumerConfigure.class); for (String beanName : beanNames) { ConsumerConfigure consumer = (ConsumerConfigure) configurableListableBeanFactory.getBean(beanName); BeanDefinitionBuilder beanDefinitionBuilder=BeanDefinitionBuilder.genericBeanDefinition(NettyProxyFactoryBean.class); beanDefinitionBuilder.addPropertyValue("interfaceClass",consumer.getInterfaceClass()); beanDefinitionBuilder.addPropertyValue("ip",consumer.getIp()); beanDefinitionBuilder.addPropertyValue("port",consumer.getPort()); beanDefinitionBuilder.addPropertyValue("retries",consumer.getRetries()); beanDefinitionBuilder.addPropertyReference("nettyClient","nettyClient"); this.beanFactory.registerBeanDefinition(consumer.getInterfaceClass(),beanDefinitionBuilder.getRawBeanDefinition()); } } } NettyClient.java,远程调用客户端 package com.jd.wyjm.netty.client.core; import com.jd.wyjm.netty.client.core.adapter.ClientChannelInitializer; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; @Slf4j @Component public class NettyClient implements InitializingBean { /** * 客户端Bootstrap */ private Bootstrap bootstrap; /** * 客户端WORKER */ private EventLoopGroup worker; @PreDestroy public void close(){ log.info("关闭资源"); worker.shutdownGracefully(); } /** * 远程调用 * @param cmd * @param ip * @param retry * @param port * @param retries * @return */ public Object remoteCall(final MethodInvokeMeta cmd, String ip,int retry,int port,int retries){ try{ ClientChannelInitializer customChannel=new ClientChannelInitializer(cmd); bootstrap.handler(customChannel); ChannelFuture channelFuture=bootstrap.connect(ip,port).sync(); channelFuture.channel().closeFuture().sync(); return customChannel.getResponse(); }catch (InterruptedException e){ retry++; if (retry > retries) { throw new RuntimeException("调用Wrong"); } else { try { Thread.sleep(100); } catch (InterruptedException e1) { e1.printStackTrace(); } log.info("第{}次尝试....失败", retry); return remoteCall(cmd, ip,retry,port,retries); } } } @Override public void afterPropertiesSet() throws Exception { bootstrap = new Bootstrap(); worker = new NioEventLoopGroup(); bootstrap.group(worker); bootstrap.channel(NioSocketChannel.class); } }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-client-core com.jd.wyjm.netty wyjm-netty-common 1.0-SNAPSHOT com.jd.wyjm.netty wyjm-netty-client-bean 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-web

wyjm-netty-client-web,客户端启动程序

WebApplication.java,启动程序类 package com.jd.wyjm.netty.client.web.app; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.support.SpringBootServletInitializer; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ImportResource; @SpringBootApplication @ComponentScan(basePackages = { "com.jd.wyjm.netty.common.*", "com.jd.wyjm.netty.client.core", "com.jd.wyjm.netty.client.core.*", "com.jd.wyjm.netty.client.web.*" }) @ImportResource(value = {"classpath:config/consumer.xml"}) public class WebApplication extends SpringBootServletInitializer { public static void main(String[] args){ SpringApplication.run(WebApplication.class,args); } @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { return builder.sources(WebApplication.class); } }

consumer.xml,RPC接口配置

UserServiceImplTest.java,单元测试类. package com.jd.wyjm.netty.client.web.test; import com.jd.wyjm.netty.client.web.app.WebApplication; import com.jd.wyjm.netty.server.service.UserRpcService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes = WebApplication.class) public class UserServiceImplTest { @Autowired private UserRpcService userRpcService; @Test public void insertUser(){ System.out.println(userRpcService.inserUser("单纯的心")); } }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-client-web com.jd.wyjm.netty wyjm-netty-client-core 1.0-SNAPSHOT com.jd.wyjm.netty wyjm-netty-server-api 1.0-SNAPSHOT junit junit test 4.12

pom.xml,父节点的pom文件

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-client pom wyjm-netty-client-bean wyjm-netty-client-core wyjm-netty-client-web

    至此客户端的RPC调用代码编写完成

服务器端(wyjm-netty-server):

  wyjm-netty-server-api,对外提供接口API。此模块设计思想,不依赖外部任何一个模块。

UserRpcService.java,用户服务接口类 package com.jd.wyjm.netty.server.service; public interface UserRpcService { /** * 插入用户数据 * @param userName */ Long inserUser(String userName); }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-server-api

  wyjm-netty-server-core,核心模块,可以封装为jar。

  

 ServerChannelHandlerAdapter.java,服务端渠道句柄适配器

package com.jd.wyjm.netty.server.core.adapter; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Sharable @Slf4j public class ServerChannelHandlerAdapter extends ChannelHandlerAdapter { /** * 分派器 */ @Autowired private ServerRequestDispatcher dispatcher; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MethodInvokeMeta invokeMeta = (MethodInvokeMeta) msg; // 屏蔽toString()方法 if (invokeMeta.getMethodName().endsWith("toString()") && !"class java.lang.String".equals(invokeMeta.getReturnType().toString())) log.info("客户端传入参数 :{},返回值:{}", invokeMeta.getArgs(), invokeMeta.getReturnType()); dispatcher.dispatcher(ctx, invokeMeta); } } ServerRequestDispatcher.java,请求分派器 package com.jd.wyjm.netty.server.core.adapter; import com.jd.wyjm.netty.common.core.CommonResult; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import com.jd.wyjm.netty.common.entity.NullWritable; import com.jd.wyjm.netty.common.enums.CodeEnum; import com.jd.wyjm.netty.server.core.config.NettyProperties; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * * 请求分派器 * * @author zhangqiang35 */ @Component public class ServerRequestDispatcher implements ApplicationContextAware, InitializingBean { /** * 线程池 */ private ExecutorService executor= null; /** * 上下文 */ private ApplicationContext applicationContext; /** * 配置 */ @Autowired private NettyProperties nettyProperties; /** * 消息发送 */ public void dispatcher(final ChannelHandlerContext ctx,final MethodInvokeMeta invokeMeta){ /** * 提交消息发送 */ executor.submit(()->{ ChannelFuture channelFuture=null; try{ //取得到接口名称 Class interfaceClass=invokeMeta.getInterfaceClass(); //取得到方法名称 String methodName=invokeMeta.getMethodName(); //取得参数数组 Object[] args=invokeMeta.getArgs(); //取得参数类型 Class[] parameterTypes=invokeMeta.getParameterTypes(); //目标对象 Object targetObject=applicationContext.getBean(interfaceClass); //目标对象方法 Method method = targetObject.getClass().getMethod(methodName, parameterTypes); //调用对应方法 Object obj=method.invoke(targetObject,args); if(obj == null){ channelFuture=ctx.writeAndFlush(NullWritable.nullWritable()); }else{ channelFuture=ctx.writeAndFlush(obj); } channelFuture.addListener(ChannelFutureListener.CLOSE); }catch (Exception ex){ CommonResult commonResult = new CommonResult(CodeEnum.UNKOWN_ERROR); channelFuture= ctx.writeAndFlush(commonResult); channelFuture.addListener(ChannelFutureListener.CLOSE); } }); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext=applicationContext; } @Override public void afterPropertiesSet() throws Exception { executor= Executors.newFixedThreadPool(nettyProperties.getMaxThreads()); } } NettyProperties.java,属性配置 package com.jd.wyjm.netty.server.core.config; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "netty.server") @Data public class NettyProperties { /** * 端口 */ private Integer port; /** * 最大线程数 */ private Integer maxThreads; /** * 数据包最大长度 */ private Integer maxLength; } ServerNettyRunner.java,开启接口服务 package com.jd.wyjm.netty.server.core; import com.jd.wyjm.netty.common.entity.ObjectCodec; import com.jd.wyjm.netty.server.core.adapter.ServerChannelHandlerAdapter; import com.jd.wyjm.netty.server.core.config.NettyProperties; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import javax.annotation.Resource; @Slf4j @Component public class ServerNettyRunner implements CommandLineRunner { /** * 创建bootstrap */ private final ServerBootstrap serverBootstrap=new ServerBootstrap(); /** * 创建BOSS */ private final EventLoopGroup bossLoopGroup=new NioEventLoopGroup(); /** * 创建WORKER */ private final EventLoopGroup workerLoopGroup=new NioEventLoopGroup(); /** * 渠道适配器 */ @Resource private ServerChannelHandlerAdapter serverChannelHandlerAdapter; /** * 配置 */ @Autowired private NettyProperties nettyProperties; @PreDestroy public void close(){ log.info("关闭服务器....."); //退出BOSS bossLoopGroup.shutdownGracefully(); //退出WORKER workerLoopGroup.shutdownGracefully(); } /** * 开启线程服务 * @param strings * @throws Exception */ @Override public void run(String... strings) throws Exception { serverBootstrap.group(bossLoopGroup,workerLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)); try{ serverBootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline channelPipeline=socketChannel.pipeline(); channelPipeline.addLast(new LengthFieldBasedFrameDecoder(nettyProperties.getMaxLength(), 0, 2, 0, 2)); channelPipeline.addLast(new LengthFieldPrepender(2)); channelPipeline.addLast(new ObjectCodec()); channelPipeline.addLast(serverChannelHandlerAdapter); } }); log.info("netty服务器在[{}]端口启动监听",nettyProperties.getPort()); ChannelFuture channelFuture=serverBootstrap.bind(nettyProperties.getPort()).sync(); channelFuture.channel().closeFuture().sync(); }catch (Exception e){ log.info("[出现异常] 释放资源"); bossLoopGroup.shutdownGracefully(); workerLoopGroup.shutdownGracefully(); } } }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-server-core com.jd.wyjm.netty wyjm-netty-common 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-web

  wyjm-netty-server-provider,代理模块,对外输出的RPC接口统一封装。

UserRpcServiceImpl.java,RPC接口的实现。 package com.jd.wyjm.netty.server.provider.service; import com.jd.wyjm.netty.server.service.UserRpcService; import com.jd.wyjm.netty.server.service.UserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class UserRpcServiceImpl implements UserRpcService { @Autowired private UserService userService; @Override public Long inserUser(String userName) { return userService.insertUser(userName); } }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-server-provider com.jd.wyjm.netty wyjm-netty-server-api 1.0-SNAPSHOT com.jd.wyjm.netty wyjm-netty-common 1.0-SNAPSHOT com.jd.wyjm.netty wyjm-netty-server-service 1.0-SNAPSHOT

  wyjm-netty-server-service,服务层模块。

  

UserServiceImpl.java,用户服务实现类。 package com.jd.wyjm.netty.server.service.impl; import com.jd.wyjm.netty.server.service.UserService; import org.springframework.stereotype.Service; @Service public class UserServiceImpl implements UserService { @Override public Long insertUser(String userName) { System.out.println("插入用户数据信息"); return System.currentTimeMillis(); } } UserService.java,用户接口类 package com.jd.wyjm.netty.server.service; public interface UserService { /** * 插入用户数据 * @param userName */ Long insertUser(String userName); }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-server-service com.jd.wyjm.netty wyjm-netty-server-api 1.0-SNAPSHOT com.jd.wyjm.netty wyjm-netty-server-core 1.0-SNAPSHOT

  wyjm-netty-server-web,应用启动模块。

WebApplication.java,应用启动类 package com.jd.wyjm.netty.server.web.app; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.ComponentScan; @SpringBootApplication @ComponentScan(basePackages = { "com.jd.wyjm.netty.common.*", "com.jd.wyjm.netty.server.core", "com.jd.wyjm.netty.server.core.*", "com.jd.wyjm.netty.server.web.*", "com.jd.wyjm.netty.server.service.*", "com.jd.wyjm.netty.server.provider.service" }) public class WebApplication { public static void main(String[] args){ SpringApplication.run(WebApplication.class,args); } }

application.yml

debug: true spring: profiles: active: test

application-test.yml

debug: false server: port: 9999 context-path: / netty: server: port: 2020 maxLength: 65535 maxThreads: 100

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-server-web com.jd.wyjm.netty wyjm-netty-server-core 1.0-SNAPSHOT com.jd.wyjm.netty wyjm-netty-server-provider 1.0-SNAPSHOT

pom.xml,server的pom

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-server wyjm-netty-server-api wyjm-netty-server-core wyjm-netty-server-provider wyjm-netty-server-service wyjm-netty-server-web

pom.xml,wyjm-netty的pom文件

4.0.0 org.springframework.boot spring-boot-starter-parent 1.5.3.RELEASE com.jd.wyjm.netty wyjm-netty 1.0-SNAPSHOT pom wyjm-netty-common wyjm-netty-client wyjm-netty-server org.projectlombok lombok org.springframework.boot spring-boot-starter-test test

公共模块(wyjm-netty-common):

CommonResult.java,接口统一响应结果,此处暂时没有用到。 package com.jd.wyjm.netty.common.core; import com.jd.wyjm.netty.common.enums.CodeEnum; import lombok.Data; import lombok.ToString; /** * 响应结果 * * @create 2018-8-22 * @author zhangqiang200 */ @Data @ToString public class CommonResult { /** * 应答码 * * @see 默认值为操作成功 */ private int code = CodeEnum.SUCCESS.getCode(); /** * 应答码描述 * * @see 默认值为操作成功 */ private String message = CodeEnum.SUCCESS.getMessage(); /** * 应答数据体 * * @see 可以为空 */ private T data; public CommonResult() { this(CodeEnum.SUCCESS.getCode(), CodeEnum.SUCCESS.getMessage(), null); } public CommonResult(CodeEnum codeEnum) { this(codeEnum.getCode(), codeEnum.getMessage(), null); } /** * 默认返回的code=CodeEnum.SUCCESS.getCode() */ public CommonResult(T data) { this(CodeEnum.SUCCESS.getCode(), CodeEnum.SUCCESS.getMessage(), data); } /** * 默认返回的data=null */ public CommonResult(int code, String message) { this(code, message, null); } public CommonResult(int code, String message, T data) { this.code = code; this.message = message; this.data = data; } /** 判断返回结果是否成功 */ public boolean success() { return code == CodeEnum.SUCCESS.getCode(); } } MethodInvokeMeta.java,调用方法元数据类. package com.jd.wyjm.netty.common.entity; import lombok.Data; import java.io.Serializable; /** * 调用的方法元信息 * * @author 单纯的心 */ @Data public class MethodInvokeMeta implements Serializable { /** *接口 */ private Class interfaceClass; /** * 方法名 */ private String methodName; /** * 参数 */ private Object[] args; /** * 返回值类型 */ private Class returnType; /** * 参数类型 */ private Class[] parameterTypes; } NullWritable.java,返回空处理. package com.jd.wyjm.netty.common.entity; import java.io.Serializable; /** * 返回为空处理 */ public class NullWritable implements Serializable { private static NullWritable instance = new NullWritable(); private NullWritable() { } public static NullWritable nullWritable() { return instance; } } ObjectCodec.java,消息解析 package com.jd.wyjm.netty.common.entity; import com.jd.wyjm.netty.common.utils.ObjectSerializerUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageCodec; import java.util.List; /** * 消息解析 * @author zhangqiang35 */ public class ObjectCodec extends MessageToMessageCodec { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List out) throws Exception { byte[] data = ObjectSerializerUtils.serilizer(msg); ByteBuf buf = Unpooled.buffer(); buf.writeBytes(data); out.add(buf); } @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List out) throws Exception { byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); Object deSerilizer = ObjectSerializerUtils.deSerilizer(bytes); out.add(deSerilizer); } } CodeEnum.java, 响应结果枚举 package com.jd.wyjm.netty.common.enums; /** * 响应码 * @author zhangqiang35 */ public enum CodeEnum { /** * 处理成功 */ SUCCESS(200, "处理成功"), /** * 未知异常 */ UNKOWN_ERROR(999,"未知异常"); /** * 响应代码 */ private final int code; /** * 响应消息 */ private final String message; CodeEnum(int _code, String _message) { this.code = _code; this.message = _message; } public int getCode() { return code; } public String getMessage() { return message; } } ObjectSerializerUtils.java,对象序列号工具 package com.jd.wyjm.netty.common.utils; import java.io.*; public class ObjectSerializerUtils { /** * 反序列化 * * @param data * @return */ public static Object deSerilizer(byte[] data) { if (data != null && data.length > 0) { try { ByteArrayInputStream bis = new ByteArrayInputStream(data); ObjectInputStream ois = new ObjectInputStream(bis); return ois.readObject(); } catch (Exception e) { e.printStackTrace(); } return null; } else { return null; } } /** * 序列化对象 * * @param obj * @return */ public static byte[] serilizer(Object obj) { if (obj != null) { try { ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); oos.close(); return bos.toByteArray(); } catch (IOException e) { e.printStackTrace(); } return null; } else { return null; } } } WrapMethodUtils.java,方法参数封装 package com.jd.wyjm.netty.common.utils; import com.jd.wyjm.netty.common.entity.MethodInvokeMeta; import java.lang.reflect.Method; /** * @author zhangqiang35 */ public class WrapMethodUtils { /** * 获取到方法的元数据 * @param interfaceClass * @param method * @param args * @return */ public static MethodInvokeMeta readMethod(Class interfaceClass, Method method, Object[] args) { MethodInvokeMeta methodInvokeMeta = new MethodInvokeMeta(); methodInvokeMeta.setInterfaceClass(interfaceClass); methodInvokeMeta.setArgs(args); methodInvokeMeta.setMethodName(method.getName()); methodInvokeMeta.setReturnType(method.getReturnType()); Class[] parameterTypes = method.getParameterTypes(); methodInvokeMeta.setParameterTypes(parameterTypes); return methodInvokeMeta; } }

pom.xml

wyjm-netty com.jd.wyjm.netty 1.0-SNAPSHOT 4.0.0 wyjm-netty-common io.netty netty-all 5.0.0.Alpha2

至此通过Netty实现RPC接口的调用已完成



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3